/*

 The MIT License (MIT)

 Copyright (C) 2015 Suryanathan Padmanabhan

 Permission is hereby granted, free of charge, to any person obtaining a copy
 of this software and associated documentation files (the "Software"), to deal
 in the Software without restriction, including without limitation the rights
 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 copies of the Software, and to permit persons to whom the Software is
 furnished to do so, subject to the following conditions:

 The above copyright notice and this permission notice shall be included in
 all copies or substantial portions of the Software.

 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 THE SOFTWARE.

*/

/**
 * @file etcd.hpp
 * @brief c++ language binding for etcd
 * @author Suryanathan Padmanabhan
 * @version v0.1
 * @date 2015-10-25
 *
 * C++ binding to access etcd API.
 *
 * Dependencies: libcurl, C++11
 *
 * The following abstractions are available to access the etcd API:
 *
 * a) etcd::Client implements the main client interface.
 * b) etcd::Watch implements a callback based watchdog to watch etcd key and
 * directory changes.
 *
 * Response from etcd is JSON. This implementation is agnostic to any specific
 * json library. If you already have a json library in your project, just
 * implement a wrapper simalar to one in "rapid_reply.hpp". If you would like
 * to pick another JSON implementation, here:
 * https://github.com/miloyip/nativejson-benchmark would be a good place to
 *start.
 *
 */

#ifndef __ETCD_HPP_INCLUDED__
#define __ETCD_HPP_INCLUDED__

#include <curl/curl.h>
#include <map>
#include <memory>
#include <sstream>
#include <string>

#ifndef ETCD_SERVER
// Enable by default or use build flags
#define ETCD_SERVER 1
#endif  // ETCD_SERVER

// Enable if you want to use the server api
#ifdef ETCD_SERVER
#include <vector>
#include <set>
#include <unistd.h>
#include <signal.h>
#include <errno.h>
#include <string.h>
#endif  // ETCD_SERVER

//#define DEBUG 1
//#define CRAZY_VERBOSE 1
//#define CRAZY_VERBOSE_STREAM stderr
#ifndef MAX_FAILURES
#define MAX_FAILURES 5
#endif

namespace etcd {

//----------------------------- EXCEPTIONS ----------------------------------

/**
 * @brief Exception generated by etcd client
 */
struct ClientException : public std::runtime_error {
  ClientException(const std::string& error)
      : std::runtime_error("etcd unknown exception"), error(error) {}

  virtual const char* what() const throw() { return error.c_str(); }

  std::string error;
};

#ifdef ETCD_SERVER
/**
 * @brief Exception generated by etcd client
 */
struct ServerException : public std::runtime_error {
  ServerException(const std::string& error)
      : std::runtime_error("etcd server unknown exception"), error(error) {}

  virtual const char* what() const throw() { return error.c_str(); }

  std::string error;
};
#endif  // ETCD_SERVER

/**
 * @brief Exception generated by etcd. The json wrapper throws an exception
 * based on whether etcd returned an error
 */
struct ReplyException : public std::runtime_error {
  ReplyException(int error_code, const std::string& msg,
                 const std::string& cause)
      : std::runtime_error("etcd exception"),
        cause(cause),
        error_code(error_code),
        msg(msg) {}

  virtual const char* what() const throw() {
    std::ostringstream estr;
    estr << msg << "[" << error_code << "]: " << cause;
    return estr.str().c_str();
  }

  std::string cause;
  int error_code;
  std::string msg;
};

// ---------------------------- TYPES ---------------------------------------

namespace internal {
class Curl;
}

enum class Action {
  SET,
  GET,
  DELETE,
  UPDATE,
  CREATE,
  COMPARE_AND_SWAP,
  COMPARE_AND_DELETE,
  EXPIRE,
  UNKNOWN
};

struct ResponseActionMap : public std::map<std::string, Action> {
  ResponseActionMap() {
    this->operator[]("set") = Action::SET;
    this->operator[]("get") = Action::GET;
    this->operator[]("delete") = Action::DELETE;
    this->operator[]("update") = Action::UPDATE;
    this->operator[]("create") = Action::CREATE;
    this->operator[]("compareAndSwap") = Action::COMPARE_AND_SWAP;
    this->operator[]("compareAndDelete") = Action::COMPARE_AND_DELETE;
    this->operator[]("expire") = Action::EXPIRE;
  }

  ~ResponseActionMap() {}
};

typedef uint16_t Port;
typedef uint64_t Index;
typedef uint64_t TtlValue;

/**
 * @brief c++ language binding for an etcd curl client
 *
 * @tparam Reply see rapid_reply.hpp for an example Reply template. It should
 * be constructable using a std::string(json response).
 */
template <typename Reply>
class Client {
 public:
  // LIFECYCLE
  Client(const std::string& server, const Port& port);

  // OPERATIONS

  /**
   * @brief Set a key-value pair
   *
   * @param key full prefix of the key
   * @param value the value
   *
   * @return see etcd::Client @tparam
   */
  Reply Set(const std::string& key, const std::string& value);

  /**
   * @brief Set a key-value pair that expires after a certain number of
   * seconds.
   *
   * @param key full prefix of the key
   * @param value the value
   * @param ttl the time to live in seconds. key-value will expire immediately
   * if ttl is set to zero.
   *
   * @return see etcd::Client @tparam
   */
  Reply Set(const std::string& key, const std::string& value,
            const TtlValue& ttl);

  /**
   * @brief Url encode a given value
   *
   * @param value string to escape
   *
   * @return escaped string
   */
  std::string UrlEncode(const std::string& value);

  /**
   * @brief Url decode a given value
   *
   * @param value string to unescape
   *
   * @return unescaped string
   */
  std::string UrlDecode(const std::string& value);

  /**
   * @brief Clear the ttl on a key.
   *
   * @param key full prefix of the key
   * @param value the value
   *
   * @return see etcd::Client @tparam
   */
  Reply ClearTtl(const std::string& key, const std::string& value);

  /**
   * @brief Create an in-order key. etcd will create a sequential key inside
   * directory "dir" and associate it with value
   *
   * @param dir full prefix of the directory
   * @param value the value
   *
   * @return see etcd::Client @tparam
   */
  Reply SetOrdered(const std::string& dir, const std::string& value);

  /**
   * @brief Get the value of a key
   *
   * @param key full prefix of the key
   *
   * @return see etcd::Client @tparam
   */
  Reply Get(const std::string& key);

  /**
   * @brief Recursively get all the keys and directory rooted @ key
   *
   * @param key the key or directory to fetch
   *
   * @return see etcd::Client @tparam
   */
  Reply GetAll(const std::string& key);

  /**
   * @brief enumerate the in-order keys as a sorted list
   *
   * @param dir the directory which holds the in-order key
   *
   * @return see etcd::Client @tparam
   */
  Reply GetOrdered(const std::string& dir);

  /**
   * @brief Delete a key-value pair
   *
   * @param key the key or empty directory to delete
   *
   * @return see etcd::Client @tparam
   */
  Reply Delete(const std::string& key);

  /**
   * @brief Add a directory
   *
   * @param dir full prefix of the directory
   *
   * @return see etcd::Client @tparam
   */
  Reply AddDirectory(const std::string& dir);

  /**
   * @brief Add a direcotry that expires after ttl seconds
   *
   * @param dir full prefix name of directory to add
   * @param ttl expiry in seconds
   *
   * @return see etcd::Client @tparam
   */
  Reply AddDirectory(const std::string& dir, const TtlValue& ttl);

  /**
   * @brief Update the ttl of a directory. The directory can only be
   * updated with a specific ttl. Use clear_ttl to clear the ttl. A ttl
   * value of zero will expire the directory immediately
   *
   * @param dir full prefix name of the directory to delete
   * @param ttl expiry in seconds
   *
   * @return see etcd::Client @tparam
   */
  Reply UpdateDirectoryTtl(const std::string& dir, const TtlValue& ttl);

  /**
   * @brief Delete a directory and optionally its contents. If recursive is
   * false and the directory is not empty, it will throw an exception
   *
   * @param dir full prefix name of the directory
   * @param recursive flag to indicate whether we should delete child nodes.
   *
   * @return see etcd::Client @tparam
   */
  Reply DeleteDirectory(const std::string& dir, bool recursive = false);

  /**
   * @brief Atomic compare and swap if the previous value of key matches
   * a specified key
   *
   * @param key full prefix of the key to update
   * @param value new value of the key
   * @param prevValue existing value to check
   *
   * @return see etcd::Client @tparam
   */
  Reply CompareAndSwapIf(const std::string& key, const std::string& value,
                         const std::string& prevValue);

  /**
   * @brief Atomically compare and swap a key if the specified previousIndex
   * matches the current modified index of the key
   *
   * @param key full prefix of the key to update
   * @param value new value of the key
   * @param prevIndex index to match with the modifiedIndex
   *
   * @return see etcd::Client @tparam
   */
  Reply CompareAndSwapIf(const std::string& key, const std::string& value,
                         const Index& prevIndex);

  /**
   * @brief Atomically compare and swap a key based on whether it alreayd
   * exists or not
   *
   * @param key full prefix of the key to update
   * @param value new value
   * @param prevExist should the key already exist or not?
   *
   * @return see etcd::Client @tparam
   */
  Reply CompareAndSwapIf(const std::string& key, const std::string& value,
                         bool prevExist);

  /**
   * @brief Atomically compare and delete a key
   *
   * @param key full prefix of the key to delete
   * @param prevValue only delete the key if the value matches this field
   *
   * @return see etcd::Client @tparam
   */
  Reply CompareAndDeleteIf(const std::string& key,
                           const std::string& prevValue);

  /**
   * @brief Atomically compare and delete a key if the current modified
   * index is equal to the passed index
   *
   * @param key full prefix of the key to delete
   * @param prevIndex only delete if the modifiedIndex is equal to this field
   *
   * @return see etcd::Client @tparam
   */
  Reply CompareAndDeleteIf(const std::string& key, const Index& prevIndex);

 private:
  // CONSTANTS
  const char* kPutRequest = "PUT";
  const char* kPostRequest = "POST";
  const char* kDeleteRequest = "DELETE";

  const char* kValue = "value";
  const char* kTttl = "ttl";
  const char* kDir = "dir";
  const char* kPrevExist = "prevExist";
  const char* kPrevIndex = "prevIndex";
  const char* kPrevValue = "prevValue";
  const char* kSortedSuffix = "?recursive=true&sorted=true";

  // DATA
  bool enable_header_;
  std::string url_;
  std::string url_prefix_;
  std::unique_ptr<internal::Curl> handle_;

  // OPERATIONS
  Reply _GetReply(const std::string& json);
};

/**
 * @brief A watch abstraction for monitoring a key or directory
 *
 * @tparam Reply json reply wrapper
 */
template <typename Reply>
class Watch {
 public:
  // TYPES
  typedef std::function<void(const Reply& r)> Callback;

  // LIFECYCLE
  /**
   * @brief Create a etcd::Watch object without authentication
   *
   * @param server etcd client URL without the port
   * @param port etcd client port
   */
  Watch(const std::string& server, const Port& port);

  /**
   * @brief Start the watch on a specific key or directory
   *
   * @param key key or directory to watch
   * @param callback call back when there is a change
   * @param prevIndex index value to start a watch from
   *
   * This function assumes you already know the current state of the key
   *
   * It handles index out of date by performing a GET and using X-Etcd-Index
   * filed from the header to start a new watch. The callback is also invoked
   * with the response from GET.
   *
   * It handles empty reply (generated when etcd server is going down or
   * cluster is getting reinitialized?) and tries to restart a watch upto
   * MAX_FAILURE failures in a row.
   */
  void Run(const std::string& key, Callback callback,
           const Index& prevIndex = 0);

  /**
   * @brief Start the watch on a specific key or directory. This will return
   * immmediately after a first change. It is the user's responsibility to
   * reschedule a watch. modifiedIndex will be stored by the API
   *
   * @param key key or directory to watch
   * @param callback call back when there is a change
   * @param prevIndex index value to start a watch from
   *
   * This function assumes you already know the current state of the key
   *
   * It handles index out of date by performing a GET and using X-Etcd-Index
   * filed from the header to start a new watch. The callback is also invoked
   * with the response from GET.
   *
   * It handles empty reply (generated when etcd server is going down and
   * throws etcd::ClientException
   */
  void RunOnce(const std::string& key, Callback callback,
               const Index& prevIndex = 0);

 private:
  // DATA MEMBERS
  Index prev_index_;
  std::string url_prefix_;
  std::unique_ptr<internal::Curl> handle_;
};

#ifdef ETCD_SERVER
/**
 * @brief A class to construct the server arguments
 */
struct Server {
  // CONSTANTS
  const bool STATE_NEW = true;
  const bool STATE_EXISTING = false;
  const std::string HTTP_PREFIX = "http://";

  // TYPES
  typedef std::set<std::string> UrlList;
  typedef std::vector<std::string> ArgList;

  struct Peer {
    std::string name;
    std::string url;
    uint16_t port;
  };
  typedef std::vector<Peer> PeerList;

  // Member flags
  uint16_t peer_port = 0;    // global peer port
  uint16_t client_port = 0;  // global client port

  std::string name;
  std::string client_url;
  std::string peer_url;

  std::string data_dir;
  std::string wal_dir;
  uint64_t snapshot_count = 0;
  uint64_t heartbeat_interval_ms = 0;
  uint64_t election_timeout_ms = 0;
  UrlList listen_peer_urls;
  UrlList listen_client_urls;
  uint64_t max_snapshots = 0;
  uint64_t max_wals = 0;
  UrlList cors;

  // Clustering flags
  UrlList initial_advertise_peer_urls;
  PeerList initial_cluster;
  bool initial_cluster_state = STATE_NEW;
  std::string initial_cluster_token;
  UrlList advertise_client_urls;
  std::string discovery;
  std::string discovery_srv;
  std::string discovery_fallback;
  std::string discovery_proxy;

  // Proxy flags
  bool proxy = false;
  uint64_t proxy_failure_wait = 0;
  uint64_t proxy_refresh_interval = 0;
  uint64_t proxy_dial_timeout = 0;
  uint64_t proxy_write_timeout = 0;
  uint64_t proxy_read_timeout = 0;

  pid_t pid_;

  template <typename List>
  std::string _ListJoin(const List& list, const std::string& prefix,
                        const std::string& suffix) {
    int i = 0;
    std::string ret;
    for (auto const& elem : list) {
      if (i++) ret += ",";
      ret += prefix + elem + suffix;
    }
    return ret;
  }

  std::string _GetUrlList(const UrlList& urls, uint16_t port) {
    const std::string _SUFFIX = (port ? ":" + std::to_string(port) : "");
    return _ListJoin(urls, HTTP_PREFIX, _SUFFIX);
  }

  void _SetArg(const std::string& name, const std::string& value,
               ArgList& args) {
    if (!value.empty()) {
      args.push_back(name);
      args.push_back(value);
    }
  }

  void _SetArg(const std::string& name, uint64_t value, ArgList& args) {
    if (value) {
      args.push_back(name);
      args.push_back(std::to_string(value));
    }
  }

  void _SetArg(const std::string& name, const UrlList& urls, uint16_t port,
               ArgList& args) {
    if (urls.size()) {
      args.push_back(name);
      args.push_back(_GetUrlList(urls, port));
    }
  }

  void _GetMemberArgs(ArgList& args) {
    if (name.empty()) {
      throw ServerException("name not specified");
    }
    _SetArg("-name", name, args);
    _SetArg("-data-dir", data_dir, args);
    _SetArg("-wal-dir", wal_dir, args);
    _SetArg("-snapshot-count", snapshot_count, args);
    _SetArg("-heartbeat-interval", heartbeat_interval_ms, args);
    _SetArg("-election-timeout", election_timeout_ms, args);
    _SetArg("-listen-peer-urls", listen_peer_urls, peer_port, args);
    _SetArg("-listen-client-urls", listen_client_urls, client_port, args);
    _SetArg("-max-snapshots", max_snapshots, args);
    _SetArg("-max-wals", max_wals, args);
    _SetArg("-cors", _ListJoin(cors, "", ""), args);
  }

  void _GetClusterArgs(ArgList& args) {
    _SetArg("-initial-advertise-peer-urls", initial_advertise_peer_urls,
            peer_port, args);

    std::string peer_list;
    int i(0);
    for (auto const& peer : initial_cluster) {
      if (i++) peer_list += ",";
      peer_list += peer.name + "=" + HTTP_PREFIX + peer.url;
      if (peer.port)
        peer_list += ":" + std::to_string(peer.port);
      else if (peer_port)
        peer_list += ":" + std::to_string(peer_port);
    }

    _SetArg("-initial-cluster", peer_list, args);

    _SetArg("-initial-cluster-state",
            initial_cluster_state ? "new" : "existing", args);
    _SetArg("-initial-cluster-token", initial_cluster_token, args);
    _SetArg("-advertise-client-urls", advertise_client_urls, client_port, args);
    _SetArg("-discovery", discovery, args);
    _SetArg("-discovery-srv", discovery_srv, args);
    _SetArg("-discovery-fallback", discovery_fallback, args);
    _SetArg("-discovery-proxy", discovery_proxy, args);
  }

  void _GetProxyArgs(ArgList& args) {
    _SetArg("-proxy", (proxy ? "on" : ""), args);
    _SetArg("-proxy-failure-wait", proxy_failure_wait, args);
    _SetArg("-proxy-refresh-interval", proxy_refresh_interval, args);
    _SetArg("-proxy-dial-timeout", proxy_dial_timeout, args);
    _SetArg("-proxy-write-timeout", proxy_write_timeout, args);
    _SetArg("-proxy-read-timeout", proxy_read_timeout, args);
  }

  void _GetArgs(ArgList& args) {
    _GetMemberArgs(args);
    _GetClusterArgs(args);
    _GetProxyArgs(args);
  }

  void _GetMemberEnvArgs(ArgList& args) {
    if (name.empty()) {
      throw ServerException("name not specified");
    }
    _SetArg("ETCD_NAME", name, args);
    _SetArg("ETCD_DATA_DIR", data_dir, args);
    _SetArg("ETCD_WAL_DIR", wal_dir, args);
    _SetArg("ETCD_SNAPSHOT_COUNT", snapshot_count, args);
    _SetArg("ETCD_HEARTBEAT_INTERVAL", heartbeat_interval_ms, args);
    _SetArg("ETCD_ELECTION_TIMEOUT", election_timeout_ms, args);
    _SetArg("ETCD_LISTEN_PEER_URLS", listen_peer_urls, peer_port, args);
    _SetArg("ETCD_LISTEN_CLIENT_URLS", listen_client_urls, client_port, args);
    _SetArg("ETCD_MAX_SNAPSHOTS", max_snapshots, args);
    _SetArg("ETCD_MAX_WALS", max_wals, args);
    _SetArg("ETCD_CORS", _ListJoin(cors, "", ""), args);
  }

  void _GetClusterEnvArgs(ArgList& args) {
    _SetArg("ETCD_INITIAL_ADVERTISE_PEER_URLS", initial_advertise_peer_urls,
            peer_port, args);

    std::string peer_list;
    int i(0);
    for (auto const& peer : initial_cluster) {
      if (i++) peer_list += ",";
      peer_list += peer.name + "=" + HTTP_PREFIX + peer.url;
      if (peer.port)
        peer_list += ":" + std::to_string(peer.port);
      else if (peer_port)
        peer_list += ":" + std::to_string(peer_port);
    }

    _SetArg("ETCD_INITIAL_CLUSTER", peer_list, args);

    _SetArg("ETCD_INITIAL_CLUSTER_STATE",
            initial_cluster_state ? "new" : "existing", args);
    _SetArg("ETCD_INITIAL_CLUSTER_TOKEN", initial_cluster_token, args);
    _SetArg("ETCD_ADVERTISE_CLIENT_URLS", advertise_client_urls, client_port,
            args);
    _SetArg("ETCD_DISCOVERY", discovery, args);
    _SetArg("ETCD_DISCOVERY_SRV", discovery_srv, args);
    _SetArg("ETCD_DISCOVERY_FALLBACK", discovery_fallback, args);
    _SetArg("ETCD_DISCOVERY_PROXY", discovery_proxy, args);
  }

  void _GetProxyEnvArgs(ArgList& args) {
    _SetArg("ETCD_PROXY", (proxy ? "on" : ""), args);
    _SetArg("ETCD_PROXY_FAILURE_WAIT", proxy_failure_wait, args);
    _SetArg("ETCD_PROXY_REFRESH_INTERVAL", proxy_refresh_interval, args);
    _SetArg("ETCD_PROXY_DIAL_TIMEOUT", proxy_dial_timeout, args);
    _SetArg("ETCD_PROXY_WRITE_TIMEOUT", proxy_write_timeout, args);
    _SetArg("ETCD_PROXY_READ_TIMEOUT", proxy_read_timeout, args);
  }

  void _GetEnvArgs(ArgList& args) {
    _GetMemberEnvArgs(args);
    _GetClusterEnvArgs(args);
    _GetProxyEnvArgs(args);
  }

  // ToDo Add TLS args

  /**
   * @brief Get Arguments for passing to etcd executable
   *
   * @param args output argument list
   */
  void GetArgs(ArgList& args) { _GetArgs(args); }

  /**
   * @brief Get environment variables that can be set before starting
   * etcd
   *
   * @param args output environment variable, value list
   */
  void GetEnvArgs(ArgList& args) { _GetEnvArgs(args); }
};

#endif  // ETCD_SERVER
//------------------------------ INTERNAL TYPES -----------------------------

namespace internal {

struct CurlUnknownException : public std::runtime_error {
  CurlUnknownException(const std::string& error)
      : std::runtime_error("curl unknown exception"), error(error) {}

  virtual const char* what() const throw() { return error.c_str(); }

  std::string error;
};

struct CurlException : public std::runtime_error {
  CurlException(CURLcode errorCode, const std::string& msg)
      : std::runtime_error("curl exception"), error_code(errorCode), msg(msg) {}

  virtual const char* what() const throw() {
    std::ostringstream estr;
    estr << msg << " [code: " << error_code << "] ";
    estr << curl_easy_strerror(error_code);
    return estr.str().c_str();
  }

  CURLcode error_code;
  std::string msg;
};

typedef std::map<std::string, std::string> CurlOptions;

class Curl {
 public:
  // LIFECYCLE
  Curl();
  ~Curl();

  // OPERATIONS
  std::string Get(const std::string& url);

  std::string Set(const std::string& url, const std::string& type,
                  const CurlOptions& options);

  std::string UrlEncode(const std::string& value);
  std::string UrlDecode(const std::string& value);

  void EnableHeader(bool onOff);

  std::string GetHeader();

  // callback from 'C' functions
  size_t WriteCb(void* buffer_p, size_t size, size_t nmemb) throw();
  size_t HeaderCb(void* buffer_p, size_t size, size_t nmemb) throw();

 private:
  // DATA MEMBERS
  CURL* handle_;
  std::ostringstream write_stream_;
  std::ostringstream header_stream_;
  bool enable_header_;

  // LIFECYCLE
  Curl(const Curl& rhs);
  void operator=(const Curl&& rhs);

  // OPERATIONS
  void _CheckError(CURLcode err, const std::string& msg);
  void _ResetHandle();

  void _SetCommonOptions(const std::string& url);

  void _SetGetOptions(const std::string& url);

  void _SetPostOptions(const std::string& url, const std::string& type,
                       const CurlOptions& options);
};
}

//------------------------------- LIFECYCLE ----------------------------------

template <typename Reply>
Client<Reply>::Client(const std::string& server, const Port& port) try
    : handle_(new internal::Curl()) {
  std::ostringstream ostr;
  ostr << "http://" << server << ":" << port;
  url_ = ostr.str();
  ostr << "/v2/keys";
  url_prefix_ = ostr.str();
} catch (const std::exception& e) {
  throw ClientException(e.what());
}

//------------------------------- OPERATIONS ---------------------------------
template <typename Reply>
Reply Client<Reply>::Set(const std::string& key, const std::string& value) {
  std::string ret;
  try {
    ret = handle_->Set(url_prefix_ + key, kPutRequest, {{kValue, value}});
  } catch (const std::exception& e) {
    throw ClientException(e.what());
  }
  return _GetReply(ret);
}

template <typename Reply>
Reply Client<Reply>::Set(const std::string& key, const std::string& value,
                         const TtlValue& ttl) {
  std::string ret;
  try {
    ret = handle_->Set(url_prefix_ + key, kPutRequest,
                       {
                           {kValue, value}, {kTttl, std::to_string(ttl)},
                       });
  } catch (const std::exception& e) {
    throw ClientException(e.what());
  }
  return _GetReply(ret);
}

template <typename Reply>
Reply Client<Reply>::ClearTtl(const std::string& key,
                              const std::string& value) {
  std::string ret;
  try {
    ret = handle_->Set(url_prefix_ + key, kPutRequest,
                       {{kValue, value}, {kTttl, ""}, {kPrevExist, "true"}});
  } catch (const std::exception& e) {
    throw ClientException(e.what());
  }
  return _GetReply(ret);
}

template <typename Reply>
std::string Client<Reply>::UrlEncode(const std::string& value) {
  return handle_->UrlEncode(value);
}

template <typename Reply>
std::string Client<Reply>::UrlDecode(const std::string& value) {
  return handle_->UrlDecode(value);
}

template <typename Reply>
Reply Client<Reply>::SetOrdered(const std::string& dir,
                                const std::string& value) {
  std::string ret;
  try {
    ret = handle_->Set(url_prefix_ + dir, kPostRequest, {{kValue, value}});
  } catch (const std::exception& e) {
    throw ClientException(e.what());
  }
  return _GetReply(ret);
}

template <typename Reply>
Reply Client<Reply>::Get(const std::string& key) {
  std::string ret;
  try {
    ret = handle_->Get(url_prefix_ + key);
  } catch (const std::exception& e) {
    throw ClientException(e.what());
  }
  return _GetReply(ret);
}

template <typename Reply>
Reply Client<Reply>::GetAll(const std::string& key) {
  std::string ret;
  try {
    ret = handle_->Get(url_prefix_ + key + "?recursive=true");
    ;
  } catch (const std::exception& e) {
    throw ClientException(e.what());
  }
  return _GetReply(ret);
}

template <typename Reply>
Reply Client<Reply>::GetOrdered(const std::string& dir) {
  std::string ret;
  try {
    ret = handle_->Get(url_prefix_ + dir + std::string(kSortedSuffix));
  } catch (const std::exception& e) {
    throw ClientException(e.what());
  }
  return _GetReply(ret);
}

template <typename Reply>
Reply Client<Reply>::Delete(const std::string& key) {
  std::string ret;
  try {
    ret = handle_->Set(url_prefix_ + key, kDeleteRequest, {});
  } catch (const std::exception& e) {
    throw ClientException(e.what());
  }
  return _GetReply(ret);
}

template <typename Reply>
Reply Client<Reply>::AddDirectory(const std::string& dir) {
  std::string ret;
  try {
    ret = handle_->Set(url_prefix_ + dir, kPutRequest, {{kDir, "true"}});
  } catch (const std::exception& e) {
    throw ClientException(e.what());
  }
  return _GetReply(ret);
}

template <typename Reply>
Reply Client<Reply>::AddDirectory(const std::string& dir, const TtlValue& ttl) {
  std::string ret;
  try {
    ret = handle_->Set(url_prefix_ + dir, kPutRequest,
                       {
                           {kDir, "true"}, {kTttl, std::to_string(ttl)},
                       });
  } catch (const std::exception& e) {
    throw ClientException(e.what());
  }
  return _GetReply(ret);
}

template <typename Reply>
Reply Client<Reply>::UpdateDirectoryTtl(const std::string& dir,
                                        const TtlValue& ttl) {
  std::string ret;
  try {
    ret = handle_->Set(
        url_prefix_ + dir, kPutRequest,
        {{kDir, "true"}, {kTttl, std::to_string(ttl)}, {kPrevExist, "true"}});
  } catch (const std::exception& e) {
    throw ClientException(e.what());
  }
  return _GetReply(ret);
}

template <typename Reply>
Reply Client<Reply>::DeleteDirectory(const std::string& dir, bool recursive) {
  std::ostringstream ostr;
  ostr << url_prefix_ + dir << "?dir=true";
  if (recursive) ostr << "&recursive=true";

  std::string ret;
  try {
    ret = handle_->Set(ostr.str(), kDeleteRequest, {});
  } catch (const std::exception& e) {
    throw ClientException(e.what());
  }
  return _GetReply(ret);
}

template <typename Reply>
Reply Client<Reply>::CompareAndSwapIf(const std::string& key,
                                      const std::string& value,
                                      const std::string& prevValue) {
  std::ostringstream ostr;
  ostr << url_prefix_ << key << "?" << kPrevValue << "=" << prevValue;

  std::string ret;
  try {
    ret = handle_->Set(ostr.str(), kPutRequest, {{kValue, value}});
  } catch (const std::exception& e) {
    throw ClientException(e.what());
  }
  return _GetReply(ret);
}

template <typename Reply>
Reply Client<Reply>::CompareAndSwapIf(const std::string& key,
                                      const std::string& value,
                                      const Index& prevIndex) {
  std::ostringstream ostr;
  ostr << url_prefix_ << key << "?" << kPrevIndex << "="
       << std::to_string(prevIndex);

  std::string ret;
  try {
    ret = handle_->Set(ostr.str(), kPutRequest, {{kValue, value}});
  } catch (const std::exception& e) {
    throw ClientException(e.what());
  }
  return _GetReply(ret);
}

template <typename Reply>
Reply Client<Reply>::CompareAndSwapIf(const std::string& key,
                                      const std::string& value,
                                      bool prevExist) {
  std::ostringstream ostr;
  ostr << url_prefix_ << key << "?" << kPrevExist << "="
       << (prevExist ? "true" : "false");

  std::string ret;
  try {
    ret = handle_->Set(ostr.str(), kPutRequest, {{kValue, value}});
  } catch (const std::exception& e) {
    throw ClientException(e.what());
  }
  return _GetReply(ret);
}

template <typename Reply>
Reply Client<Reply>::CompareAndDeleteIf(const std::string& key,
                                        const std::string& prevValue) {
  std::ostringstream ostr;
  ostr << url_prefix_ << key << "?" << kPrevValue << "=" << prevValue;

  std::string ret;
  try {
    ret = handle_->Set(ostr.str(), kDeleteRequest, {});
  } catch (const std::exception& e) {
    throw ClientException(e.what());
  }
  return _GetReply(ret);
}

template <typename Reply>
Reply Client<Reply>::CompareAndDeleteIf(const std::string& key,
                                        const Index& prevIndex) {
  std::ostringstream ostr;
  ostr << url_prefix_ << key << "?" << kPrevIndex << "="
       << std::to_string(prevIndex);

  std::string ret;
  try {
    ret = handle_->Set(ostr.str(), kDeleteRequest, {});
  } catch (const std::exception& e) {
    throw ClientException(e.what());
  }
  return _GetReply(ret);
}

//------------------------------ OPERATIONS ----------------------------------

template <typename Reply>
Reply Client<Reply>::_GetReply(const std::string& json) {
  if (enable_header_) return Reply(handle_->GetHeader(), json);
  return Reply(json);
}

//------------------------------- LIFECYCLE ----------------------------------

template <typename Reply>
Watch<Reply>::Watch(const std::string& server, const Port& port) try
    : handle_(new internal::Curl()),
      prev_index_(0) {
  std::ostringstream ostr;
  ostr << "http://" << server << ":" << port << "/v2/keys";
  url_prefix_ = ostr.str();
} catch (const std::exception& e) {
  throw ClientException(e.what());
}

//------------------------------- OPERATIONS ---------------------------------

template <typename Reply>
void Watch<Reply>::Run(const std::string& key, Watch::Callback callback,
                       const Index& prevIndex) {
  const std::string watch_url_base = url_prefix_ + key + "?wait=true";
  const std::string wait_url_base = watch_url_base + "&waitIndex=";

  std::string watch_url = watch_url_base;

  if (prevIndex) {
    prev_index_ = prevIndex;
    watch_url += std::to_string(prev_index_ + 1);
  } else if (prev_index_) {
    watch_url += std::to_string(prev_index_ + 1);
  }

  int max_failures = MAX_FAILURES;

  while (max_failures) {
    try {
      // Watch for a change
      std::string ret = handle_->Get(watch_url);

      // Construct a reply and invoke the callback
      Reply r(ret);
      callback(r);

      // Update the prevIndex and the watch url
      prev_index_ = r.get_modified_index();
      watch_url = wait_url_base + std::to_string(prev_index_ + 1);

      // reset failures on a successful watch response
      max_failures = MAX_FAILURES;

    } catch (const ReplyException& e) {
      if (e.error_code == 401) {
        // We got an index out of date.
        try {
          // Enable curl headers
          handle_->EnableHeader(true);

          // Get the current state and call back
          std::string ret = handle_->Get(url_prefix_ + key);
          Reply r(ret);
          callback(r);

          // Get the new index from the curl header and start a watch
          std::istringstream stream(handle_->GetHeader());
          std::string etcd_index_label("X-Etcd-Index: ");
          std::string::size_type len = etcd_index_label.length();
          std::string line;
          std::string::size_type pos;
          while (std::getline(stream, line)) {
            if ((pos = line.find(etcd_index_label) != std::string::npos)) {
              prev_index_ = std::stoi(line.substr(pos + len - 1));
              break;
            }
          }
          handle_->EnableHeader(false);
          watch_url = wait_url_base + std::to_string(prev_index_ + 1);
        } catch (...) {
        }
      }
      max_failures--;  // still consider as a failure
    } catch (const std::exception& e) {
      // Possibly timed out and we didn't get a previous index
      // ToDo check timout options for libcurl
      max_failures--;
    }
  }
  if (!max_failures) {
    throw ClientException("watch failed or timedout");
  }
  return;
}

template <typename Reply>
void Watch<Reply>::RunOnce(const std::string& key, Watch::Callback callback,
                           const Index& prevIndex) {
  const std::string watch_url_base = url_prefix_ + key + "?wait=true";
  const std::string wait_url_base = watch_url_base + "&waitIndex=";

  std::string watch_url = watch_url_base;

  if (prevIndex) {
    prev_index_ = prevIndex;
    watch_url += std::to_string(prev_index_ + 1);
  } else if (prev_index_) {
    watch_url += std::to_string(prev_index_ + 1);
  }

  try {
    // Watch for a change
    std::string ret = handle_->Get(watch_url);

    // Construct a reply and invoke the callback
    Reply r(ret);
    callback(r);

    // Update the prevIndex and the watch url
    prev_index_ = r.get_modified_index();
    watch_url = wait_url_base + std::to_string(prev_index_ + 1);

  } catch (const ReplyException& e) {
    if (e.error_code == 401) {
      // We got an index out of date.
      try {
        // Enable curl headers
        handle_->EnableHeader(true);

        // Get the current state and call back
        std::string ret = handle_->Get(url_prefix_ + key);
        Reply r(ret);
        callback(r);

        // Get the new index from the curl header and start a watch
        std::istringstream stream(handle_->GetHeader());
        std::string etcd_index_label("X-Etcd-Index: ");
        std::string::size_type len = etcd_index_label.length();
        std::string line;
        std::string::size_type pos;
        while (std::getline(stream, line)) {
          if ((pos = line.find(etcd_index_label) != std::string::npos)) {
            prev_index_ = std::stoi(line.substr(pos + len - 1));
            break;
          }
        }
        handle_->EnableHeader(false);
        watch_url = wait_url_base + std::to_string(prev_index_ + 1);
      } catch (...) {
      }
    }
  } catch (const std::exception& e) {
    throw ClientException("failed with" + std::string(e.what()));
  }
}

//--------------------------- INTERNAL IMPL ---------------------------------

namespace internal {

extern "C" size_t _WriteCb(void* buffer_p, size_t size, size_t nmemb,
                           internal::Curl* curl_p);

extern "C" size_t _HeaderCb(void* buffer_p, size_t size, size_t nmemb,
                            internal::Curl* curl_p);

#ifdef DEBUG
extern "C" int _CurlTrace(CURL* handle, curl_infotype type, char* data,
                          size_t size, void* userp);
#endif

}  // namespace internal
}  // namespace etcd

#endif  // __ETCD_HPP_INCLUDED__
